{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-versioned-pipeline-endpoints.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "\n",
        "# How to Setup a PipelineEndpoint and Submit a Pipeline Using the PipelineEndpoint.\n",
        "In this notebook, we will see how to setup a PipelineEndpoint and run a specific pipeline version.\n",
        "\n",
        "PipelineEndpoint can be used to update a published pipeline while maintaining the same endpoint.\n",
        "PipelineEndpoint provides a way to keep track of [PublishedPipelines](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.publishedpipeline) using versions. PipelineEndpoint uses endpoint with version information to trigger an underlying published pipeline. Pipeline endpoints are uniquely named within a workspace.  \n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Prerequisites and AML Basics\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Workspace\n",
        "\n",
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Notebook Overview\n",
        "In this notebook, we provide an introduction to Azure machine learning PipelineEndpoints. It covers:\n",
        "* [Create PipelineEndpoint](#Create-PipelineEndpoint), How to create PipelineEndpoint.\n",
        "* [Retrieving PipelineEndpoint](#Retrieving-PipelineEndpoint), How to get specific PipelineEndpoint from worskpace by name/Id and get all [PipelineEndpoints](#Get-all-PipelineEndpoints-in-workspace) within workspace.\n",
        "* [PipelineEndpoint Properties](#PipelineEndpoint-properties). How to get and set PipelineEndpoint properties, such as default version of PipelineEndpoint.\n",
        "* [PipelineEndpoint Submission](#PipelineEndpoint-Submission). How to run a Pipeline using PipelineEndpoint."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "###  Create PipelineEndpoint\n",
        "Following are required input parameters to create PipelineEndpoint:\n",
        "\n",
        "* *workspace*: AML workspace.\n",
        "* *name*: name of PipelineEndpoint, it is unique within workspace.\n",
        "* *description*: description details for PipelineEndpoint.\n",
        "* *pipeline*: A [Pipeline](#Steps-to-create-simple-Pipeline) or [PublishedPipeline](#Publish-Pipeline), to set default version of PipelineEndpoint.                                                       "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "####  Initialization, Steps to create a Pipeline\n",
        "\n",
        "The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute import AmlCompute, ComputeTarget\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "from azureml.pipeline.core import Pipeline\n",
        "\n",
        "#Retrieve an already attached Azure Machine Learning Compute\n",
        "from azureml.core.compute_target import ComputeTargetException\n",
        "aml_compute_target = \"cpu-cluster\"\n",
        "try:\n",
        "    aml_compute = AmlCompute(ws, aml_compute_target)\n",
        "    print(\"Found existing compute target: {}\".format(aml_compute_target))\n",
        "except ComputeTargetException:\n",
        "    print(\"Creating new compute target: {}\".format(aml_compute_target))\n",
        "    \n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n",
        "                                                                min_nodes = 1, \n",
        "                                                                max_nodes = 4)    \n",
        "    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n",
        "    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
        "\n",
        "# source_directory\n",
        "source_directory = 'publish_run_train'\n",
        "# define a single step pipeline for demonstration purpose.\n",
        "trainStep = PythonScriptStep(\n",
        "    name=\"Training_Step\",\n",
        "    script_name=\"train.py\", \n",
        "    compute_target=aml_compute_target, \n",
        "    source_directory=source_directory\n",
        ")\n",
        "print(\"TrainStep created\")\n",
        "# build and validate Pipeline\n",
        "pipeline = Pipeline(workspace=ws, steps=[trainStep])\n",
        "print(\"Pipeline is built\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Publish Pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from datetime import datetime\n",
        "\n",
        "timenow = datetime.now().strftime('%m-%d-%Y-%H-%M')\n",
        "\n",
        "pipeline_name = timenow + \"-Pipeline\"\n",
        "print(pipeline_name)\n",
        "\n",
        "published_pipeline = pipeline.publish(\n",
        "    name=pipeline_name, \n",
        "    description=pipeline_name)\n",
        "print(\"Newly published pipeline id: {}\".format(published_pipeline.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Publishing PipelineEndpoint\n",
        "Create PipelineEndpoint with required parameters: workspace, name, description and pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PipelineEndpoint\n",
        "\n",
        "pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name=\"PipelineEndpointTest\",\n",
        "                                            pipeline=pipeline, description=\"Test description Notebook\")\n",
        "pipeline_endpoint"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Retrieving PipelineEndpoint\n",
        "\n",
        "PipelineEndpoint is uniquely defined by name and id within workspace. PipelineEndpoint in workspace can be retrived by Id or by name."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get PipelineEndpoint by Name\n",
        "\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n",
        "pipeline_endpoint_by_name"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get PipelineEndpoint by Id\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "#get the PipelineEndpoint Id\n",
        "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"PipelineEndpointTest\")\n",
        "endpoint_id = pipeline_endpoint_by_name.id\n",
        "\n",
        "pipeline_endpoint_by_id = PipelineEndpoint.get(workspace=ws, id=endpoint_id)\n",
        "pipeline_endpoint_by_id"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get all PipelineEndpoints in workspace\n",
        "Returns all PipelineEndpoints within workspace"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "endpoint_list = PipelineEndpoint.list(workspace=ws, active_only=True)\n",
        "endpoint_list"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### PipelineEndpoint properties"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Default Version of PipelineEndpoint\n",
        "Default version of PipelineEndpoint starts from \"0\" and increments on addition of pipelines.\n",
        "\n",
        "##### Get the Default Version"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "default_version = pipeline_endpoint_by_name.get_default_version()\n",
        "default_version"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#####  Set default version \n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name.set_default_version(\"0\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get the Published Pipeline corresponds to specific version of PipelineEndpoint"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline = pipeline_endpoint_by_name.get_pipeline(\"0\")\n",
        "pipeline"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get default version Published Pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline = pipeline_endpoint_by_name.get_pipeline()\n",
        "pipeline"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Add Published Pipeline to PipelineEndpoint, \n",
        "Adds a published pipeline (if its not present) using add() and if you want to add and set to default use add_default()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name.add(published_pipeline)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Add Published pipeline to PipelineEndpoint and set it to default version\n",
        "Adding published pipeline to PipelineEndpoint if not present and set it to default"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Set Published Pipeline to PipelineEndpoint, if exists\n",
        "pipeline_endpoint_by_name.set_default(published_pipeline)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get all Versions in PipelineEndpoint\n",
        "Returns list of published pipelines and its versions"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "versions = pipeline_endpoint_by_name.list_versions()\n",
        "\n",
        "for ve in versions:\n",
        "    print(ve.version)\n",
        "    print(ve.pipeline.id)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get all Published Pipelines in PipelineEndpoint\n",
        "Returns all active pipelines in PipelineEnpoint, if active_only flag is set to True."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipelines = pipeline_endpoint_by_name.list_pipelines(active_only=True)\n",
        "pipelines"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Name property of PipelineEndpoint\n",
        "PipelineEndpoint is uniquely identified by name"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "##### Set Name PipelineEndpoint"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name.set_name(name=\"NewName\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### PipelineEndpoint Submission\n",
        "PipelineEndpoint triggers specific versioned pipeline or default pipeline by:\n",
        "* Rest Endpoint \n",
        "* Submit call."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Run Pipeline by endpoint property of PipelineEndpoint\n",
        "Run specific pipeline using endpoint property of PipelineEndpoint and executing http post."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name=\"NewName\")\n",
        "\n",
        "# endpoint with id \n",
        "rest_endpoint_id =  pipeline_endpoint_by_name.endpoint\n",
        "\n",
        "# for default version pipeline\n",
        "rest_endpoint_id_without_version_with_id = rest_endpoint_id\n",
        "\n",
        "# for specific version pipeline just append version info\n",
        "version=\"0\"\n",
        "rest_endpoint_id_with_version = rest_endpoint_id_without_version_with_id+\"/\"+ version\n",
        "print(rest_endpoint_id_with_version)\n",
        "pipeline_endpoint_by_name"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# endpoint with name\n",
        "rest_endpoint_name = rest_endpoint_id.split(\"Id\", 1)[0] + \"Name?name=\" + pipeline_endpoint_by_name.name\n",
        "\n",
        "# for default version pipeline\n",
        "rest_endpoint_name_without_version = rest_endpoint_name\n",
        "\n",
        "# for specific version pipeline just append version info\n",
        "version=\"0\"\n",
        "rest_endpoint_name_with_version = rest_endpoint_name_without_version+\"&pipelineVersion=\"+ version\n",
        "print(rest_endpoint_name_with_version)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.authentication import InteractiveLoginAuthentication\n",
        "import requests\n",
        "\n",
        "auth = InteractiveLoginAuthentication()\n",
        "aad_token = auth.get_authentication_header()\n",
        "\n",
        "#endpoint = pipeline_endpoint_by_name.url\n",
        "\n",
        "print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint_name_with_version))\n",
        "\n",
        "# specify the param when running the pipeline\n",
        "response = requests.post(rest_endpoint_name_with_version, \n",
        "                         headers=aad_token, \n",
        "                         json={\"ExperimentName\": \"default_pipeline\",\n",
        "                               \"RunSource\": \"SDK\",\n",
        "                               \"ParameterAssignments\": {\"1\": \"united\", \"2\":\"city\"}})"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "try:\n",
        "    response.raise_for_status()\n",
        "except Exception:    \n",
        "    raise Exception('Received bad response from the endpoint: {}\\n'\n",
        "                    'Response Code: {}\\n'\n",
        "                    'Headers: {}\\n'\n",
        "                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n",
        "\n",
        "run_id = response.json().get('Id')\n",
        "print('Submitted pipeline run: ', run_id)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Run Pipeline by Submit call of PipelineEndpoint \n",
        "Run specific pipeline using Submit api of PipelineEndpoint"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# submit pipeline with specific version\n",
        "run_id = pipeline_endpoint_by_name.submit(\"NewName\", pipeline_version=\"0\")\n",
        "print(run_id)\n",
        "\n",
        "# submit pipeline with default version\n",
        "run_id = pipeline_endpoint_by_name.submit(\"NewName\")\n",
        "print(run_id)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Use Experiment.Submit() to Submit Pipeline\n",
        "Run specific pipeline using Experiment submit api"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Experiment\n",
        "pipeline_run = Experiment(ws, name=\"submit_from_endpoint\").submit(pipeline_endpoint_by_name, tags={'endpoint_tag': \"1\"}, pipeline_version=\"0\")"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "How to setup a versioned Pipeline Endpoint",
    "kernelspec": {
      "display_name": "Python 3.6",
      "language": "python",
      "name": "python36"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "order_index": 12,
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of PipelineEndpoint to run a specific version of the Published Pipeline"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}